Coverage for src/rok4_tools/joincache_utils/master.py: 92%
133 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
1import itertools
2import os
3import tempfile
4from typing import Dict, List, Tuple, Union
6from rok4 import storage
7from rok4.enums import PyramidType, SlabType
8from rok4.pyramid import Pyramid
10from rok4_tools.global_utils.source import SourcePyramids
12"""Todo list instructions
14* c2w <source slab> - Convert a source pyramid slab (MASK ro DATA) to work format
15* oNt - Overlay previous converted data slabs (considering possible masks)
16* w2c <destination slab> - Convert the output of last overlayNtiff to slab format, into the output pyramid
17* link <destination slab> <source slab> <source index> - Make a symbolic slab to source slab. Source index will be used to generate the pyramid's list
19"""
22def work(config: Dict) -> None:
23 """Master steps : prepare and split copies and merge to do
25 Load and check input pyramids from the descriptors and write the todo lists, splitting the copies or merge to do
27 Args:
28 config (Dict): JOINCACHE configuration
30 Raises:
31 Exception: Cannot load the input or the output pyramid
32 Exception: S3 cluster host have not to be provided into bucket names (output or inputs)
33 Exception: Sources pyramid have different features
34 Exception: Cannot open stream to write todo lists
35 Exception: Cannot copy todo lists
36 """
38 datasources = []
39 tms_reference = None
40 format_reference = None
41 channels_reference = None
42 for datasource in config["datasources"]:
43 sources = SourcePyramids(
44 datasource["bottom"],
45 datasource["top"],
46 datasource["source"]["descriptors"],
47 )
48 # Vérification de l'unicité du TMS
49 if not tms_reference:
50 tms_reference = sources.tms
51 elif tms_reference.name != sources.tms.name:
52 raise Exception(
53 f"Sources pyramids cannot have two different TMS : {tms_reference} and {sources.tms}"
54 )
56 # Vérification de l'unicité du format
57 if not format_reference:
58 format_reference = sources.format
59 elif format_reference != sources.format:
60 raise Exception(
61 f"Sources pyramids cannot have two different format : {format_reference} and {sources.format}"
62 )
64 # Vérification de l'unicité du nombre de canaux
65 if not channels_reference:
66 channels_reference = sources.channels
67 elif channels_reference != sources.channels:
68 raise Exception(
69 f"Sources pyramids cannot have two different numbers of channels : {channels_reference} and {sources.channels}"
70 )
72 # Vérification du type des pyramides
73 if sources.type != PyramidType.RASTER:
74 raise Exception(f"Some sources pyramids are not a raster")
76 datasources += [sources]
78 # Chargement de la pyramide à écrire
79 storage_pyramid = {
80 "type": datasources[0].pyramids[0].storage_type,
81 "root": config["pyramid"]["root"],
82 }
83 try:
84 to_pyramid = Pyramid.from_other(
85 datasources[0].pyramids[0],
86 config["pyramid"]["name"],
87 storage_pyramid,
88 mask=config["pyramid"]["mask"],
89 )
90 except Exception as e:
91 raise Exception(
92 f"Cannot create the destination pyramid descriptor from the source one: {e}"
93 )
95 if to_pyramid.storage_s3_cluster is not None:
96 # On ne travaille que sur un unique cluster S3, il ne faut pas préciser lequel dans les chemins
97 raise Exception(
98 f"Do not set S3 cluster host into output bucket name ({config['pyramid']['root']}) : only one cluster can be used with JOINCACHE"
99 )
101 # Ouverture des flux vers les listes de travail à faire
102 temp_agent_todos = []
103 temp_finisher_todo = None
104 try:
105 for i in range(0, config["process"]["parallelization"]):
106 tmp = tempfile.NamedTemporaryFile(mode="w", delete=False)
107 temp_agent_todos.append(tmp)
109 temp_finisher_todo = tempfile.NamedTemporaryFile(mode="w", delete=False)
111 except Exception as e:
112 raise Exception(f"Cannot open stream to write todo lists: {e}")
114 round_robin = itertools.cycle(temp_agent_todos)
116 slab_finish = []
117 level_finish = []
118 used_pyramids_roots = {}
119 used_pyramids_count = 0
121 for sources in datasources:
122 from_pyramids = sources.pyramids
123 levels = from_pyramids[0].get_levels(sources.bottom, sources.top)
124 for level in levels:
125 # Vérification que plusieurs datasources ne définissent pas un même niveau
126 if level.id not in level_finish:
127 try:
128 to_pyramid.delete_level(level.id)
129 except:
130 pass
131 info = sources.info_level(level.id)
132 to_pyramid.add_level(level.id, info[0], info[1], info[2])
133 level_finish += [level.id]
134 else:
135 raise Exception(f"Different datasources cannot define the same level : {level.id}")
137 for i in range(len(from_pyramids)):
138 # Récupération des dalles de la pyramide
139 slabs = from_pyramids[i].list_generator(level.id)
140 slabs_mask = from_pyramids[i].list_generator(level.id)
141 # Vérification si la dalle à déjà été traitée
142 for slab in slabs:
143 if slab[0] in slab_finish:
144 continue
145 if slab[0][0].name == "MASK":
146 continue
147 from_slab_path = storage.get_path_from_infos(
148 from_pyramids[i].storage_type, slab[1]["root"], slab[1]["slab"]
149 )
150 process = [from_slab_path]
151 slab_finish += [slab[0]]
153 # Recherche du masque correspondant à la dalle
154 if config["process"]["mask"]:
155 slab_mask = (
156 (SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]),
157 slab[1],
158 )
159 slab_mask[1]["slab"] = from_pyramids[i].get_slab_path_from_infos(
160 SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3], False
161 )
162 if slab_mask in slabs_mask:
163 mask = [
164 storage.get_path_from_infos(
165 from_pyramids[i].storage_type,
166 slab_mask[1]["root"],
167 slab_mask[1]["slab"],
168 )
169 ]
170 else:
171 mask = [""]
173 # Recherche de la dalle dans d'autres pyramides sources
174 if not config["process"]["only_links"]:
175 for j in range(i + 1, len(from_pyramids)):
176 slabs_other = from_pyramids[j].list_generator(level.id)
177 slabs_other_mask = from_pyramids[j].list_generator(level.id)
178 for slab_other in slabs_other:
179 if slab_other[0] == slab[0]:
180 from_slab_path_other = storage.get_path_from_infos(
181 from_pyramids[j].storage_type,
182 slab_other[1]["root"],
183 slab_other[1]["slab"],
184 )
185 process += [from_slab_path_other]
186 if config["process"]["mask"]:
187 slab_mask_other = (
188 (
189 SlabType["MASK"],
190 slab_other[0][1],
191 slab_other[0][2],
192 slab_other[0][3],
193 ),
194 slab_other[1],
195 )
196 slab_mask_other[1]["slab"] = from_pyramids[
197 j
198 ].get_slab_path_from_infos(
199 SlabType["MASK"],
200 slab_other[0][1],
201 slab_other[0][2],
202 slab_other[0][3],
203 False,
204 )
205 if slab_mask_other in slabs_other_mask:
206 mask += [
207 storage.get_path_from_infos(
208 from_pyramids[i].storage_type,
209 slab_mask_other[1]["root"],
210 slab_mask_other[1]["slab"],
211 )
212 ]
213 else:
214 mask += [""]
215 continue
217 to_slab_path = to_pyramid.get_slab_path_from_infos(
218 slab[0][0], slab[0][1], slab[0][2], slab[0][3]
219 )
220 if config["pyramid"]["mask"]:
221 to_slab_path_mask = to_pyramid.get_slab_path_from_infos(
222 SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]
223 )
225 # Ecriture des commandes dans les todo-lists
226 if len(process) == 1:
227 if slab[1]["root"] in used_pyramids_roots:
228 root_index = used_pyramids_roots[slab[1]["root"]]
229 else:
230 used_pyramids_count += 1
231 root_index = used_pyramids_count
232 used_pyramids_roots[slab[1]["root"]] = used_pyramids_count
234 next(round_robin).write(
235 f"link {to_slab_path} {from_slab_path} {root_index}\n"
236 )
237 if config["pyramid"]["mask"]:
238 if mask[0] != "":
239 next(round_robin).write(
240 f"link {to_slab_path_mask} {mask[0]} {root_index}\n"
241 )
242 else:
243 command = ""
244 for j in range(len(process)):
245 command += f"c2w {process[j]}\n"
246 if config["process"]["mask"]:
247 if mask[j] != "":
248 command += f"c2w {mask[j]}\n"
249 command += "oNt\n"
250 command += f"w2c {to_slab_path}\n"
251 if config["pyramid"]["mask"]:
252 command += f"w2c {to_slab_path_mask}\n"
253 next(round_robin).write(command)
255 for root in used_pyramids_roots:
256 temp_finisher_todo.write(f"{used_pyramids_roots[root]}={root}\n")
258 # Copie des listes de recopies à l'emplacement partagé (peut être du stockage objet)
259 try:
260 for i in range(0, config["process"]["parallelization"]):
261 tmp = temp_agent_todos[i]
262 tmp.close()
263 storage.copy(
264 f"file://{tmp.name}",
265 os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
266 )
267 storage.remove(f"file://{tmp.name}")
269 temp_finisher_todo.close()
270 storage.copy(
271 f"file://{temp_finisher_todo.name}",
272 os.path.join(config["process"]["directory"], f"todo.finisher.list"),
273 )
274 storage.remove(f"file://{temp_finisher_todo.name}")
276 except Exception as e:
277 raise Exception(f"Cannot copy todo lists to final location and clean: {e}")