Module rok4_tools.pyr2pyr_utils.agent
Expand source code
import logging
import os
import tempfile
from typing import Dict, List, Tuple, Union
from rok4 import storage
from rok4.pyramid import Pyramid
def work(config: Dict, split: int) -> None:
"""Agent steps : make slabs' copy
Expects the configuration, the todo list and the optionnal last done slab name : if exists, work
does not start from the beginning, but after the last copied slab. This file contains only the
destination path of the last processed slab.
For each line in the todo list, a slab copy is done.
Args:
config (Dict): PYR2PYR configuration
split (int): Split number
Raises:
Exception: Cannot get todo list
Exception: Invalid todo list line
storageError: Slab copy issue
MissingEnvironmentError: Missing object storage informations
"""
# On récupère la todo list sous forme de fichier temporaire
try:
todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
storage.copy(
os.path.join(config["process"]["directory"], f"todo.{split}.list"),
f"file://{todo_list_obj.name}",
)
except Exception as e:
raise Exception(f"Cannot copy todo lists to final location: {e}")
last_done_slab = None
have_to_work = True
last_done_fo = os.path.join(config["process"]["directory"], f"slab.{split}.last")
try:
if storage.exists(last_done_fo):
last_done_slab = storage.get_data_str(last_done_fo)
logging.info(
f"The last done slab file exists, last slab to have been copied is {last_done_slab}"
)
have_to_work = False
# On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
todo_list_obj = open(todo_list_obj.name)
for line in todo_list_obj:
line = line.rstrip()
parts = line.split(" ")
if (len(parts) != 3 and len(parts) != 4) or parts[0] != "cp":
raise Exception(
f"Invalid todo list line: we need a cp command and 3 or 4 more elements (source and destination): {line}"
)
if not have_to_work:
if parts[2] == last_done_slab:
# On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer
logging.info(f"Last copied slab reached, copies can start again")
have_to_work = True
continue
slab_md5 = None
if len(parts) == 4:
slab_md5 = parts[3]
storage.copy(parts[1], parts[2], slab_md5)
last_done_slab = parts[2]
# On nettoie les fichiers locaux et comme tout s'est bien passé, on peut supprimer aussi le fichier local du travail fait
todo_list_obj.close()
storage.remove(f"file://{todo_list_obj.name}")
storage.remove(last_done_fo)
except Exception as e:
if last_done_slab is not None:
storage.put_data_str(last_done_slab, last_done_fo)
raise Exception(f"Cannot process the todo list: {e}")
Functions
def work(config: Dict[~KT, ~VT], split: int) ‑> None
-
Agent steps : make slabs' copy
Expects the configuration, the todo list and the optionnal last done slab name : if exists, work does not start from the beginning, but after the last copied slab. This file contains only the destination path of the last processed slab.
For each line in the todo list, a slab copy is done.
Args
config
:Dict
- PYR2PYR configuration
split
:int
- Split number
Raises
Exception
- Cannot get todo list
Exception
- Invalid todo list line
storageError
- Slab copy issue
MissingEnvironmentError
- Missing object storage informations
Expand source code
def work(config: Dict, split: int) -> None: """Agent steps : make slabs' copy Expects the configuration, the todo list and the optionnal last done slab name : if exists, work does not start from the beginning, but after the last copied slab. This file contains only the destination path of the last processed slab. For each line in the todo list, a slab copy is done. Args: config (Dict): PYR2PYR configuration split (int): Split number Raises: Exception: Cannot get todo list Exception: Invalid todo list line storageError: Slab copy issue MissingEnvironmentError: Missing object storage informations """ # On récupère la todo list sous forme de fichier temporaire try: todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) storage.copy( os.path.join(config["process"]["directory"], f"todo.{split}.list"), f"file://{todo_list_obj.name}", ) except Exception as e: raise Exception(f"Cannot copy todo lists to final location: {e}") last_done_slab = None have_to_work = True last_done_fo = os.path.join(config["process"]["directory"], f"slab.{split}.last") try: if storage.exists(last_done_fo): last_done_slab = storage.get_data_str(last_done_fo) logging.info( f"The last done slab file exists, last slab to have been copied is {last_done_slab}" ) have_to_work = False # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie todo_list_obj = open(todo_list_obj.name) for line in todo_list_obj: line = line.rstrip() parts = line.split(" ") if (len(parts) != 3 and len(parts) != 4) or parts[0] != "cp": raise Exception( f"Invalid todo list line: we need a cp command and 3 or 4 more elements (source and destination): {line}" ) if not have_to_work: if parts[2] == last_done_slab: # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer logging.info(f"Last copied slab reached, copies can start again") have_to_work = True continue slab_md5 = None if len(parts) == 4: slab_md5 = parts[3] storage.copy(parts[1], parts[2], slab_md5) last_done_slab = parts[2] # On nettoie les fichiers locaux et comme tout s'est bien passé, on peut supprimer aussi le fichier local du travail fait todo_list_obj.close() storage.remove(f"file://{todo_list_obj.name}") storage.remove(last_done_fo) except Exception as e: if last_done_slab is not None: storage.put_data_str(last_done_slab, last_done_fo) raise Exception(f"Cannot process the todo list: {e}")