Module rok4_tools.joincache_utils.agent
Expand source code
import os
import tempfile
from typing import Dict, List, Tuple, Union
from rok4 import storage
from rok4.enums import SlabType
from rok4.pyramid import Level, Pyramid
def work(config: Dict, split: int) -> None:
"""Agent steps : make links or merge images
Expects the configuration, the todo list and the optionnal last done slab name : if exists, work
does not start from the beginning, but after the last copied slab. This file contains only the
destination path of the last processed slab.
A line in the todo list is either a slab's copy from pyramid format to work format, or a merge of
stacking slabs or a slab's copy from work format to pyramid format.
Args:
config (Dict): JOINCACHE configuration
split (int): Split number
Raises:
Exception: Cannot get todo list
Exception: Cannot load the input or output pyramid
Exception: Cannot process todo lists
Exception: System command raises an error
"""
# On récupère la todo list sous forme de fichier temporaire
try:
todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
storage.copy(
os.path.join(config["process"]["directory"], f"todo.{split}.list"),
f"file://{todo_list_obj.name}",
)
except Exception as e:
raise Exception(f"Cannot copy todo lists to final location: {e}")
try:
pyramid = Pyramid.from_descriptor(config["datasources"][0]["source"]["descriptors"][0])
except Exception as e:
raise Exception(
f"Cannot load source pyramid descriptor : {config['datasources'][0]['source']['descriptors'][0]} : {e}"
)
last_done_slab = None
last_done_fo = os.path.join(config["process"]["directory"], f"slab.{split}.last")
have_to_work = True
try:
if storage.exists(last_done_fo):
last_done_slab = storage.get_data_str(last_done_fo)
logging.info(
f"The last done slab file exists, last slab to have been copied is {last_done_slab}"
)
have_to_work = False
raster_specifications = pyramid.raster_specifications
format = pyramid.format
compression = format.split("_")[1].lower()
if "UINT" in format:
format_channel = "uint"
elif "FLOAT" in format:
format_channel = "float"
if "8" in format:
bits_channel = "8"
elif "32" in format:
bits_channel = "32"
multiple_slabs = False
with open(todo_list_obj.name) as file:
for line in file:
line = line.rstrip()
parts = line.split(" ")
if parts[0] == "link":
if not have_to_work:
if parts[1] == last_done_slab:
# On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer
logging.info(f"Last copied slab reached, copies can start again")
have_to_work = True
continue
storage.link(parts[2], parts[1])
if parts[0] == "c2w":
if not have_to_work:
continue
if not multiple_slabs:
i = 0
data_file = tempfile.NamedTemporaryFile(
mode="r", delete=False, suffix=".tif"
)
result_value = os.system(f"cache2work -c zip {parts[1]} {data_file.name}")
if result_value != 0:
raise Exception(f"cache2work raises an error")
data = [data_file]
mask = [""]
multiple_slabs = True
else:
slab_type = pyramid.get_infos_from_slab_path(parts[1])[0]
if slab_type == SlabType.MASK:
mask_file = tempfile.NamedTemporaryFile(
mode="r", delete=False, suffix=".tif"
)
result_value = os.system(
f"cache2work -c zip {parts[1]} {mask_file.name}"
)
if result_value != 0:
raise Exception(f"cache2work raises an error")
mask[i] = mask_file
else:
i += 1
data_file = tempfile.NamedTemporaryFile(
mode="r", delete=False, suffix=".tif"
)
result_value = os.system(
f"cache2work -c zip {parts[1]} {data_file.name}"
)
if result_value != 0:
raise Exception(f"cache2work raises an error")
data += [data_file]
mask += [""]
if parts[0] == "oNt":
if not have_to_work:
continue
result = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".tif")
file_tiff = f"{result.name}"
if config["pyramid"]["mask"]:
result_mask = tempfile.NamedTemporaryFile(
mode="r", delete=False, suffix=".tif"
)
file_tiff += f" {result_mask.name}\n"
else:
file_tiff += "\n"
for i in range(len(data) - 1, -1, -1):
file_tiff += f"{data[i].name}"
if mask[i] != "":
if i == 0:
file_tiff += f" {mask[i].name}"
else:
file_tiff += f" {mask[i].name}\n"
else:
if i != 0:
file_tiff += "\n"
fichier = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".txt")
with open(fichier.name, "w") as f:
f.write(file_tiff)
result_value = os.system(
f"overlayNtiff -f {fichier.name} -m TOP -b {raster_specifications['nodata']} -c zip -s {raster_specifications['channels']} -p {raster_specifications['photometric']}"
)
if result_value != 0:
raise Exception(f"overlayNtiff raises an error")
storage.remove(f"file://{fichier.name}")
for i in range(len(data)):
storage.remove(f"file://{data[i].name}")
pass
for i in range(len(mask)):
if mask[i] != "":
pass
storage.remove(f"file://{mask[i].name}")
multiple_slabs = False
if parts[0] == "w2c":
if not have_to_work:
if parts[1] == last_done_slab:
# On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer
logging.info(f"Last copied slab reached, copies can start again")
have_to_work = True
continue
to_tray = storage.get_infos_from_path(parts[1])[2]
if to_tray != "":
os.makedirs(to_tray, exist_ok=True)
slab_type = pyramid.get_infos_from_slab_path(parts[1])[0]
if slab_type == SlabType.DATA:
level = pyramid.get_infos_from_slab_path(parts[1])[1]
tile_width = pyramid.tms.get_level(level).tile_width
tile_heigth = pyramid.tms.get_level(level).tile_heigth
result_value = os.system(
f"work2cache -c {compression} -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result.name} {parts[1]}"
)
if result_value != 0:
raise Exception(f"work2cache raises an error")
storage.remove(f"file://{result.name}")
elif slab_type == SlabType.MASK:
level = pyramid.get_infos_from_slab_path(parts[1])[1]
tile_width = pyramid.tms.get_level(level).tile_width
tile_heigth = pyramid.tms.get_level(level).tile_heigth
result_value = os.system(
f"work2cache -c zip -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result_mask.name} {parts[1]}"
)
if result_value != 0:
raise Exception(f"work2cache raises an error")
storage.remove(f"file://{result_mask.name}")
# On nettoie les fichiers locaux et comme tout s'est bien passé, on peut supprimer aussi le fichier local du travail fait
todo_list_obj.close()
storage.remove(f"file://{todo_list_obj.name}")
storage.remove(last_done_fo)
except Exception as e:
if last_done_slab is not None:
storage.put_data_str(last_done_slab, last_done_fo)
raise Exception(f"Cannot process the todo list: {e}")
Functions
def work(config: Dict[~KT, ~VT], split: int) ‑> None
-
Agent steps : make links or merge images
Expects the configuration, the todo list and the optionnal last done slab name : if exists, work does not start from the beginning, but after the last copied slab. This file contains only the destination path of the last processed slab.
A line in the todo list is either a slab's copy from pyramid format to work format, or a merge of stacking slabs or a slab's copy from work format to pyramid format.
Args
config
:Dict
- JOINCACHE configuration
split
:int
- Split number
Raises
Exception
- Cannot get todo list
Exception
- Cannot load the input or output pyramid
Exception
- Cannot process todo lists
Exception
- System command raises an error
Expand source code
def work(config: Dict, split: int) -> None: """Agent steps : make links or merge images Expects the configuration, the todo list and the optionnal last done slab name : if exists, work does not start from the beginning, but after the last copied slab. This file contains only the destination path of the last processed slab. A line in the todo list is either a slab's copy from pyramid format to work format, or a merge of stacking slabs or a slab's copy from work format to pyramid format. Args: config (Dict): JOINCACHE configuration split (int): Split number Raises: Exception: Cannot get todo list Exception: Cannot load the input or output pyramid Exception: Cannot process todo lists Exception: System command raises an error """ # On récupère la todo list sous forme de fichier temporaire try: todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) storage.copy( os.path.join(config["process"]["directory"], f"todo.{split}.list"), f"file://{todo_list_obj.name}", ) except Exception as e: raise Exception(f"Cannot copy todo lists to final location: {e}") try: pyramid = Pyramid.from_descriptor(config["datasources"][0]["source"]["descriptors"][0]) except Exception as e: raise Exception( f"Cannot load source pyramid descriptor : {config['datasources'][0]['source']['descriptors'][0]} : {e}" ) last_done_slab = None last_done_fo = os.path.join(config["process"]["directory"], f"slab.{split}.last") have_to_work = True try: if storage.exists(last_done_fo): last_done_slab = storage.get_data_str(last_done_fo) logging.info( f"The last done slab file exists, last slab to have been copied is {last_done_slab}" ) have_to_work = False raster_specifications = pyramid.raster_specifications format = pyramid.format compression = format.split("_")[1].lower() if "UINT" in format: format_channel = "uint" elif "FLOAT" in format: format_channel = "float" if "8" in format: bits_channel = "8" elif "32" in format: bits_channel = "32" multiple_slabs = False with open(todo_list_obj.name) as file: for line in file: line = line.rstrip() parts = line.split(" ") if parts[0] == "link": if not have_to_work: if parts[1] == last_done_slab: # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer logging.info(f"Last copied slab reached, copies can start again") have_to_work = True continue storage.link(parts[2], parts[1]) if parts[0] == "c2w": if not have_to_work: continue if not multiple_slabs: i = 0 data_file = tempfile.NamedTemporaryFile( mode="r", delete=False, suffix=".tif" ) result_value = os.system(f"cache2work -c zip {parts[1]} {data_file.name}") if result_value != 0: raise Exception(f"cache2work raises an error") data = [data_file] mask = [""] multiple_slabs = True else: slab_type = pyramid.get_infos_from_slab_path(parts[1])[0] if slab_type == SlabType.MASK: mask_file = tempfile.NamedTemporaryFile( mode="r", delete=False, suffix=".tif" ) result_value = os.system( f"cache2work -c zip {parts[1]} {mask_file.name}" ) if result_value != 0: raise Exception(f"cache2work raises an error") mask[i] = mask_file else: i += 1 data_file = tempfile.NamedTemporaryFile( mode="r", delete=False, suffix=".tif" ) result_value = os.system( f"cache2work -c zip {parts[1]} {data_file.name}" ) if result_value != 0: raise Exception(f"cache2work raises an error") data += [data_file] mask += [""] if parts[0] == "oNt": if not have_to_work: continue result = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".tif") file_tiff = f"{result.name}" if config["pyramid"]["mask"]: result_mask = tempfile.NamedTemporaryFile( mode="r", delete=False, suffix=".tif" ) file_tiff += f" {result_mask.name}\n" else: file_tiff += "\n" for i in range(len(data) - 1, -1, -1): file_tiff += f"{data[i].name}" if mask[i] != "": if i == 0: file_tiff += f" {mask[i].name}" else: file_tiff += f" {mask[i].name}\n" else: if i != 0: file_tiff += "\n" fichier = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".txt") with open(fichier.name, "w") as f: f.write(file_tiff) result_value = os.system( f"overlayNtiff -f {fichier.name} -m TOP -b {raster_specifications['nodata']} -c zip -s {raster_specifications['channels']} -p {raster_specifications['photometric']}" ) if result_value != 0: raise Exception(f"overlayNtiff raises an error") storage.remove(f"file://{fichier.name}") for i in range(len(data)): storage.remove(f"file://{data[i].name}") pass for i in range(len(mask)): if mask[i] != "": pass storage.remove(f"file://{mask[i].name}") multiple_slabs = False if parts[0] == "w2c": if not have_to_work: if parts[1] == last_done_slab: # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer logging.info(f"Last copied slab reached, copies can start again") have_to_work = True continue to_tray = storage.get_infos_from_path(parts[1])[2] if to_tray != "": os.makedirs(to_tray, exist_ok=True) slab_type = pyramid.get_infos_from_slab_path(parts[1])[0] if slab_type == SlabType.DATA: level = pyramid.get_infos_from_slab_path(parts[1])[1] tile_width = pyramid.tms.get_level(level).tile_width tile_heigth = pyramid.tms.get_level(level).tile_heigth result_value = os.system( f"work2cache -c {compression} -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result.name} {parts[1]}" ) if result_value != 0: raise Exception(f"work2cache raises an error") storage.remove(f"file://{result.name}") elif slab_type == SlabType.MASK: level = pyramid.get_infos_from_slab_path(parts[1])[1] tile_width = pyramid.tms.get_level(level).tile_width tile_heigth = pyramid.tms.get_level(level).tile_heigth result_value = os.system( f"work2cache -c zip -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result_mask.name} {parts[1]}" ) if result_value != 0: raise Exception(f"work2cache raises an error") storage.remove(f"file://{result_mask.name}") # On nettoie les fichiers locaux et comme tout s'est bien passé, on peut supprimer aussi le fichier local du travail fait todo_list_obj.close() storage.remove(f"file://{todo_list_obj.name}") storage.remove(last_done_fo) except Exception as e: if last_done_slab is not None: storage.put_data_str(last_done_slab, last_done_fo) raise Exception(f"Cannot process the todo list: {e}")