Coverage for src/rok4_tools/joincache_utils/agent.py: 64%
137 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
1import os
2import tempfile
3from typing import Dict, List, Tuple, Union
5from rok4 import storage
6from rok4.enums import SlabType
7from rok4.pyramid import Level, Pyramid
10def work(config: Dict, split: int) -> None:
11 """Agent steps : make links or merge images
13 Expects the configuration, the todo list and the optionnal last done slab name : if exists, work
14 does not start from the beginning, but after the last copied slab. This file contains only the
15 destination path of the last processed slab.
17 A line in the todo list is either a slab's copy from pyramid format to work format, or a merge of
18 stacking slabs or a slab's copy from work format to pyramid format.
20 Args:
21 config (Dict): JOINCACHE configuration
22 split (int): Split number
24 Raises:
25 Exception: Cannot get todo list
26 Exception: Cannot load the input or output pyramid
27 Exception: Cannot process todo lists
28 Exception: System command raises an error
29 """
31 # On récupère la todo list sous forme de fichier temporaire
32 try:
33 todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
34 storage.copy(
35 os.path.join(config["process"]["directory"], f"todo.{split}.list"),
36 f"file://{todo_list_obj.name}",
37 )
39 except Exception as e:
40 raise Exception(f"Cannot copy todo lists to final location: {e}")
42 try:
43 pyramid = Pyramid.from_descriptor(config["datasources"][0]["source"]["descriptors"][0])
44 except Exception as e:
45 raise Exception(
46 f"Cannot load source pyramid descriptor : {config['datasources'][0]['source']['descriptors'][0]} : {e}"
47 )
49 last_done_slab = None
50 last_done_fo = os.path.join(config["process"]["directory"], f"slab.{split}.last")
51 have_to_work = True
53 try:
54 if storage.exists(last_done_fo):
55 last_done_slab = storage.get_data_str(last_done_fo)
56 logging.info(
57 f"The last done slab file exists, last slab to have been copied is {last_done_slab}"
58 )
59 have_to_work = False
61 raster_specifications = pyramid.raster_specifications
62 format = pyramid.format
63 compression = format.split("_")[1].lower()
64 if "UINT" in format:
65 format_channel = "uint"
66 elif "FLOAT" in format:
67 format_channel = "float"
68 if "8" in format:
69 bits_channel = "8"
70 elif "32" in format:
71 bits_channel = "32"
73 multiple_slabs = False
74 with open(todo_list_obj.name) as file:
75 for line in file:
76 line = line.rstrip()
77 parts = line.split(" ")
79 if parts[0] == "link":
80 if not have_to_work:
81 if parts[1] == last_done_slab:
82 # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer
83 logging.info(f"Last copied slab reached, copies can start again")
84 have_to_work = True
86 continue
88 storage.link(parts[2], parts[1])
90 if parts[0] == "c2w":
91 if not have_to_work:
92 continue
94 if not multiple_slabs:
95 i = 0
96 data_file = tempfile.NamedTemporaryFile(
97 mode="r", delete=False, suffix=".tif"
98 )
99 result_value = os.system(f"cache2work -c zip {parts[1]} {data_file.name}")
100 if result_value != 0:
101 raise Exception(f"cache2work raises an error")
102 data = [data_file]
103 mask = [""]
104 multiple_slabs = True
105 else:
106 slab_type = pyramid.get_infos_from_slab_path(parts[1])[0]
107 if slab_type == SlabType.MASK:
108 mask_file = tempfile.NamedTemporaryFile(
109 mode="r", delete=False, suffix=".tif"
110 )
111 result_value = os.system(
112 f"cache2work -c zip {parts[1]} {mask_file.name}"
113 )
114 if result_value != 0:
115 raise Exception(f"cache2work raises an error")
116 mask[i] = mask_file
117 else:
118 i += 1
119 data_file = tempfile.NamedTemporaryFile(
120 mode="r", delete=False, suffix=".tif"
121 )
122 result_value = os.system(
123 f"cache2work -c zip {parts[1]} {data_file.name}"
124 )
125 if result_value != 0:
126 raise Exception(f"cache2work raises an error")
127 data += [data_file]
128 mask += [""]
130 if parts[0] == "oNt":
131 if not have_to_work:
132 continue
134 result = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".tif")
135 file_tiff = f"{result.name}"
136 if config["pyramid"]["mask"]:
137 result_mask = tempfile.NamedTemporaryFile(
138 mode="r", delete=False, suffix=".tif"
139 )
140 file_tiff += f" {result_mask.name}\n"
141 else:
142 file_tiff += "\n"
143 for i in range(len(data) - 1, -1, -1):
144 file_tiff += f"{data[i].name}"
145 if mask[i] != "":
146 if i == 0:
147 file_tiff += f" {mask[i].name}"
148 else:
149 file_tiff += f" {mask[i].name}\n"
150 else:
151 if i != 0:
152 file_tiff += "\n"
153 fichier = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".txt")
154 with open(fichier.name, "w") as f:
155 f.write(file_tiff)
156 result_value = os.system(
157 f"overlayNtiff -f {fichier.name} -m TOP -b {raster_specifications['nodata']} -c zip -s {raster_specifications['channels']} -p {raster_specifications['photometric']}"
158 )
159 if result_value != 0:
160 raise Exception(f"overlayNtiff raises an error")
161 storage.remove(f"file://{fichier.name}")
162 for i in range(len(data)):
163 storage.remove(f"file://{data[i].name}")
164 pass
165 for i in range(len(mask)):
166 if mask[i] != "":
167 pass
168 storage.remove(f"file://{mask[i].name}")
169 multiple_slabs = False
171 if parts[0] == "w2c":
172 if not have_to_work:
173 if parts[1] == last_done_slab:
174 # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer
175 logging.info(f"Last copied slab reached, copies can start again")
176 have_to_work = True
178 continue
180 to_tray = storage.get_infos_from_path(parts[1])[2]
181 if to_tray != "":
182 os.makedirs(to_tray, exist_ok=True)
184 slab_type = pyramid.get_infos_from_slab_path(parts[1])[0]
185 if slab_type == SlabType.DATA:
186 level = pyramid.get_infos_from_slab_path(parts[1])[1]
187 tile_width = pyramid.tms.get_level(level).tile_width
188 tile_heigth = pyramid.tms.get_level(level).tile_heigth
189 result_value = os.system(
190 f"work2cache -c {compression} -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result.name} {parts[1]}"
191 )
192 if result_value != 0:
193 raise Exception(f"work2cache raises an error")
194 storage.remove(f"file://{result.name}")
195 elif slab_type == SlabType.MASK:
196 level = pyramid.get_infos_from_slab_path(parts[1])[1]
197 tile_width = pyramid.tms.get_level(level).tile_width
198 tile_heigth = pyramid.tms.get_level(level).tile_heigth
199 result_value = os.system(
200 f"work2cache -c zip -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result_mask.name} {parts[1]}"
201 )
202 if result_value != 0:
203 raise Exception(f"work2cache raises an error")
204 storage.remove(f"file://{result_mask.name}")
206 # On nettoie les fichiers locaux et comme tout s'est bien passé, on peut supprimer aussi le fichier local du travail fait
207 todo_list_obj.close()
208 storage.remove(f"file://{todo_list_obj.name}")
209 storage.remove(last_done_fo)
211 except Exception as e:
212 if last_done_slab is not None:
213 storage.put_data_str(last_done_slab, last_done_fo)
214 raise Exception(f"Cannot process the todo list: {e}")